package com.wudl.flink.core;

/**
 * @ClassName : JoinTestString
 * @Description :
 * @Author :wudl
 * @Date: 2021-07-21 00:14
 */




import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Date;
import java.util.Random;
import java.util.concurrent.TimeUnit;

public class JoinTestString {

    private static final Logger LOG = LoggerFactory.getLogger(JoinTestString.class);
    private static final String[] TYPE = {"a", "b", "c", "d"};

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //添加自定义数据源,每秒发出一笔订单信息{商品名称,商品数量}
        DataStreamSource<Tuple2<String, String>> orderSource1 = env.addSource(new SourceFunction<Tuple2<String, String>>() {
            private volatile boolean isRunning = true;
            private final Random random = new Random();

            @Override
            public void run(SourceContext<Tuple2<String, String>> ctx) throws Exception {
                while (isRunning) {
                    TimeUnit.SECONDS.sleep(1);
                    Tuple2<String, String> tuple2 = Tuple2.of(TYPE[random.nextInt(TYPE.length)], String.valueOf(random.nextInt(10)));
                    System.out.println(new Date() + ",orderSource1提交元素:" + tuple2);
                    ctx.collect(tuple2);
                }
            }

            @Override
            public void cancel() {
                isRunning = false;
            }

        }, "orderSource1");

        DataStreamSource<Tuple2<String, String>> orderSource2 = env.addSource(new SourceFunction<Tuple2<String, String>>() {
            private volatile boolean isRunning = true;
            private final Random random = new Random();

            @Override
            public void run(SourceContext<Tuple2<String, String>> ctx) throws Exception {
                while (isRunning) {
                    TimeUnit.SECONDS.sleep(1);
                    Tuple2<String, String> tuple2 =  Tuple2.of(TYPE[random.nextInt(TYPE.length)], String.valueOf(random.nextInt(10)));
                    System.out.println(new Date() + ",orderSource2提交元素:" + tuple2);
                    ctx.collect(tuple2);
                }
            }

            @Override
            public void cancel() {
                isRunning = false;
            }

        }, "orderSource2");

        orderSource1.join(orderSource2).where(new KeySelector<Tuple2<String, String>, String>() {
            @Override
            public String getKey(Tuple2<String, String> value) throws Exception {
                return value.f0;
            }
        }).equalTo(new KeySelector<Tuple2<String, String>, String>() {
            @Override
            public String getKey(Tuple2<String, String> value) throws Exception {
                return value.f0;
            }
        }).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).apply(new JoinFunction<Tuple2<String, String>, Tuple2<String, String>, Tuple2<String, String>>() {
            @Override
            public Tuple2<String, String> join(Tuple2<String, String> first, Tuple2<String, String> second) throws Exception {
                return Tuple2.of("key:"+first.f0, first.f1+"+"+ second.f1);//计算key相同的属性1值
            }
        }).print();
        env.execute("Flink JoinTest");
    }
}